#!/usr/bin/perl
use strict;
use threads;
use Thread::Queue;
use Getopt::Long qw(:config bundling no_ignore_case no_autoabbrev passthrough);
use POSIX;

my ($PORT,@DB_ARRAY,$BATCH_SIZE,$IS_HELP,$LOG_PATH);
my ($DATABASE_NAME) = ("postgres");
(my $CMD_NAME = $0) =~ s!.*/(.*)!$1!;
my $MAIN_PID = substr("000000".$$,-6);

my ($TASK_QUEUE,$MSG_QUEUE);
my (@TASK_THREAD,$MSG_THREAD);
my ($CATALOG_SIZE);
my ($BATCH_MAX,$BATCH_DEFAULT,$BATCH_MIN) = (10,5,1);
my ($LOG_PATH_DEFAULT) = ($ENV{"HOME"}."/gpAdminLogs");
my ($LOG_FILE_HANDLE);
my (@MESSAGE_CACHE);
my ($WRITE_TO_FILE) = 0;

my $GET_ALL_DATABASE_SQL = q#select datname from pg_database where datallowconn;#;
my $GET_ALL_CATALOG_SQL = q#select n.nspname||'.'||c.relname relname
    from pg_class c,pg_namespace n
    where c.relnamespace=n.oid and n.nspname='pg_catalog' and relkind='r' and relstorage<>'x';#;

my $HELP_MESSAGE=qq{COMMAND NAME: $CMD_NAME
Developed by Miao Chen

Work Email:
michen\@pivotal.io
Private Email:
miaochen\@mail.ustc.edu.cn
************************************************************************************************
SYNOPSIS
************************************************************************************************
$CMD_NAME [-p|--port port] [-d|--database database [...]][-B batch_size] [-h|--help]
*****************************************************
OPTIONS
*****************************************************

-p|--port <master port>

  Database port number, If not specified, the default is 5432.
  eg.
  --port 5433
-d|--database <maintenance Database name>

  Maintenance system catalog in this database

-B <Batch size>

  Maintenance tables number at once.
  Default is $BATCH_DEFAULT.
  Max is $BATCH_MAX.
  Min is $BATCH_MIN.

example:
$CMD_NAME -d postgres -d template1 -B 4
$CMD_NAME -h | --help
};
sub printMessage{
    my ($flag,$message) = @_;
    if("RAW" ne $flag){
        my $time_flag = strftime("%Y%m%d:%H:%M:%S.",localtime).$MAIN_PID;
        $message = "$time_flag-[$flag]-:$message\n";
    }
    if("ERROR" eq $flag){
        print STDERR $message;
    }else{
        print STDOUT $message;
    }
    return $message;
}
sub logMessage{
    my ($flag,$message) = @_;
    my $log_message = printMessage($flag,$message);
    if($WRITE_TO_FILE == 1){
        for my $msg(@MESSAGE_CACHE){
            print $LOG_FILE_HANDLE $msg;
        }
        @MESSAGE_CACHE = ();
        $WRITE_TO_FILE = 2;
        print $LOG_FILE_HANDLE $log_message;
    }elsif($WRITE_TO_FILE == 2){
        print $LOG_FILE_HANDLE $log_message;
    }else{
        push @MESSAGE_CACHE,$log_message;
    }
}
sub errorMessage{
    my ($message) = @_;
    printMessage("ERROR",$message);
    print STDERR "Usage: $CMD_NAME [-h|--help] [options]\n";
    exitMain(1);
}
sub trim{
    my ($string) = @_;
    $string =~ s/(^\s+|\s+$)//g;
    return $string;
}
sub exitMain{
    my ($code) = @_;
    if("" ne $MSG_THREAD){
        $MSG_THREAD->kill('KILL')->detach();
    }
    for my $thread(@TASK_THREAD){
        $thread->kill('KILL')->detach();
    }
    exit $code;
}
sub queryResult{
    my ($query_sql,$return_flag) = @_;
    my $CMDS = "PGDATABASE=$DATABASE_NAME PGPORT=$PORT PGOPTIONS='-c optimizer=off -c client_encoding=UTF8' ";
    $CMDS = $CMDS."psql -tAXF '#!^' -v ON_ERROR_STOP=1 2>&1 <<'END_OF_SQL'\n";
    $CMDS = $CMDS.$query_sql."\n";
    $CMDS = $CMDS."END_OF_SQL\n";
    my $query_result = readpipe($CMDS);
    my $return_code = $? >> 8;
    $query_result = trim($query_result);
    my @rows_array = split(/\n/,$query_result);
    my @temp_array = ();
    my @return_list = ();
    for my $row(@rows_array){
        if($row eq "SET"){
            next;
        }
        push @temp_array,$row;
        push @return_list,[split(/#\!\^/,$row)];
    }
    $query_result = join("\n",@temp_array);
    if("CV" eq $return_flag){
        return ($return_code,$query_result);
    }
    if($return_code){
        errorMessage($query_result);
    }elsif("Scalar" eq $return_flag){
        return $query_result;
    }else{
        return @return_list;
    }
}
sub getOption{
    GetOptions(
        'p|port:i'     => \$PORT,
        'd|database:s' => \@DB_ARRAY,
        'B:i'          => \$BATCH_SIZE,
        'h|help!'      => \$IS_HELP,
    );
    if(@ARGV != 0){
        errorMessage("Some parameters unknown: [@ARGV]\nPlease refer to $CMD_NAME --help");
    }
    if($IS_HELP){
        print $HELP_MESSAGE;
        exitMain(0);
    }
}
sub checkOption{
    if("" eq $PORT){
        $PORT = '5432';
    }
    my @db_list = queryResult($GET_ALL_DATABASE_SQL);
    my %db_hash = ();
    for my $row(@db_list){
        my ($db_name) = @$row;
        $db_hash{$db_name} = "";
    }
    my $specify_db_size = @DB_ARRAY;
    my %specify_db_hash = ();
    for my $db_name(@DB_ARRAY){
        if(exists $db_hash{$db_name}){
            if(exists $specify_db_hash{$db_name}){
                logMessage("WARN","Database name duplicate: $db_name");
            }else{
                $specify_db_hash{$db_name} = "";
            }
        }else{
            logMessage("WARN","Database name not exists: $db_name");
        }
    }
    @DB_ARRAY = keys %specify_db_hash;
    if($specify_db_size > 0 && @DB_ARRAY < 1){
        errorMessage("All specify database not exists");
    }
    if(@DB_ARRAY == 0){
        logMessage("INFO","No databse name specify, will maintenance all databse");
        @DB_ARRAY = keys %db_hash;
    }
    if("" eq $BATCH_SIZE || $BATCH_SIZE > $BATCH_MAX || $BATCH_SIZE < $BATCH_MIN){
        logMessage("NOTICE","Not specify or out of limit, use default($BATCH_DEFAULT): -B");
        $BATCH_SIZE = $BATCH_DEFAULT;
    }
    $LOG_PATH = $LOG_PATH_DEFAULT;
    logMessage("INFO","Log path:$LOG_PATH");
    system(qq{mkdir -p $LOG_PATH});
    my $log_file = $LOG_PATH."/gpdbmaintenancecat_".trim(`date +%Y%m%d`).".log";
    if(!open($LOG_FILE_HANDLE,">>",$log_file)){
        errorMessage("Can't open file:$log_file");
    }else{
        my $stdout = select $LOG_FILE_HANDLE;
        $| = 1;
        select $stdout;
    }
    $WRITE_TO_FILE = 1;
    logMessage("INFO","Log file:$log_file");
}
sub executeMaintenance{
    $SIG{'KILL'} = sub{threads->exit;};
    my $table = $TASK_QUEUE->dequeue();
    while(defined($table)){
        my @info_msg :shared = ("INFO","Start vacuum analyze $DATABASE_NAME.$table");
        $MSG_QUEUE->enqueue(\@info_msg);
        my @stat_msg :shared;
        my ($code,$value) = queryResult("LOCK TABLE ".$table." IN SHARE UPDATE EXCLUSIVE MODE NOWAIT;VACUUM ANALYZE ".$table,"CV");
        if(0 == $code){
            @stat_msg = ("SUCCESS",$DATABASE_NAME.".".$table);
        }else{
            @stat_msg = ("FAILED",$DATABASE_NAME.".".$table." ".$value);
        }
        $MSG_QUEUE->enqueue(\@stat_msg);
        $table = $TASK_QUEUE->dequeue()
    }
    $MSG_QUEUE->enqueue(undef);
    return;
}
sub executeMessage{
    $SIG{'KILL'} = sub{threads->exit;};
    my ($end_index,$error_index,$success_index) = (0,0,0);
    my $msg = $MSG_QUEUE->dequeue();
    while(1){
        if(defined $msg){
            my ($type,$msg) = @$msg;
            if("SUCCESS" eq $type){
                $success_index += 1;
                logMessage($type," ($success_index/$error_index/$CATALOG_SIZE) ".$msg);
            }elsif("FAILED" eq $type){
                $error_index += 1;
                logMessage($type," ($success_index/$error_index/$CATALOG_SIZE) ".$msg);
            }else{
                logMessage($type,$msg);
            }
        }else{
            $end_index += 1;
            if($end_index eq $BATCH_SIZE){
                last;
            }
        }
        $msg = $MSG_QUEUE->dequeue();
    }
    return $error_index;
}
sub doMaintenance{
    my ($db_name) = @_;
    $TASK_QUEUE = Thread::Queue->new();
    $MSG_QUEUE = Thread::Queue->new();
    my @catalog_list = queryResult($GET_ALL_CATALOG_SQL);
    $CATALOG_SIZE = @catalog_list;
    for my $row(@catalog_list){
        my ($cat_name) = @$row;
        $TASK_QUEUE->enqueue($cat_name);
    }
    for my $index(0 .. $BATCH_SIZE - 1){
        $TASK_QUEUE->enqueue(undef);
        my $task_thread = threads->new(\&executeMaintenance);
        push @TASK_THREAD,$task_thread;
    }
    $MSG_THREAD = threads->new(\&executeMessage);
    for my $thread(@TASK_THREAD){
        $thread->join();
    }
    @TASK_THREAD = ();
    my $value = $MSG_THREAD->join();
    $MSG_THREAD = "";
    return $value;
}
sub main{
    eval{threads->set_thread_exit_only(1);};
    if($@){
        errorMessage("Perl version is too old for multi threads.");
    }
    getOption();
    logMessage("INFO","Start catalog maintenance ".("." x 60));
    logMessage("INFO","Run command: ".$_[0]);
    checkOption();
    my $error_sum = 0;
    for my $db_name(@DB_ARRAY){
        $DATABASE_NAME = $db_name;
        logMessage("INFO","Start maintenance catalog in database: $DATABASE_NAME".("." x 20));
        my $err_size = doMaintenance();
        if($err_size == 0){
            logMessage("INFO","Success maintenance catalog in database: $DATABASE_NAME");
        }else{
            $error_sum += 1;
            logMessage("WARN","End with error maintenance catalog in database: $DATABASE_NAME");
        }
    }
    if($error_sum == 0){
        logMessage("INFO","Finish catalog maintenance with all success".("." x 60));
    }else{
         logMessage("INFO","Finish catalog maintenance with $error_sum database error".("." x 60));
    }
    exitMain($error_sum);
}
my $cm_str = $0." ".join(" ",@ARGV);
$| = 1;
main($cm_str);
